什么是 Protobuf
Protobuf(Protocol Buffer)是谷歌提出的一种数据交换格 式,是一套类似 JSON 或者 XML 的数据传输格式和规范,用于不同应用或进程之间的通信。Protobuf 具有以下特点:
- 语言和平台无关:Protobuf 支持 Java、C++、Python、JavaScript 等多种语言,支持跨多个平台。
- 小巧高效:比XML更小(3~10倍)、更快(20~100倍)、更为简单。
- 扩展性和兼容性好:可以更新数据结构,而不影响和破坏原有的旧程序。
与JSON、XML相比,Protobuf算是后起之秀,只是Protobuf更加适合于高性能、快速响应的数据传输应用场景。Protobuf数据包是一种二进制格式,相对于文本格式的数据交换(JSON、XML)来说,速度要快很多。Protobuf优异的性能使得它更加适用于分布式应用场景下的数据通信或者异构环境下的数据交换。
JSON、XML是文本格式,数据具有可读性;Protobuf是二进制数据格式,数据本身不具有可读性,只有反序列化之后才能得到真正可读的数据。正因为Protobuf是二进制数据格式,所以数据序列化之后体积相比JSON和XML要小,更加适合网络传输。
总体来说,在一个需要大量数据传输的应用场景中,数据量很大,选择Protobuf可以明显地减少传输的数据量和提升网络IO的速度。对于打造一款高性能的通信服务器来说,Protobuf传输协议是最高性能的传输协议之一。微信的消息传输就采用了Protobuf协议。
proto 文件简介
proto 文件头部声明和示例
Protobuf使用proto文件来预先定义的消息格式。数据包按照 proto文件所定义的消息格式完成二进制码流的编码和解码。proto 文件简单地说就是一个消息的协议文件,这个协议文件的后缀文件名为 “.proto”。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| syntax = "proto3"; package demo05;
option java_package = "demo05"; option java_outer_classname = "MsgProtos";
message Msg { uint32 id = 1; string content = 2; }
|
在.proto文件的头部声明中,需要声明一下所使用的Protobuf协议版本,示例中使用的是”proto3”版本。也可以使用旧一点的”proto2”版本,两个版本的消息格式有一些细微的不同,默认的协议版本为”proto2”。
Protobuf 支持很多语言,所以它为不同的语言提供了一些可选的配置选项,使用option关键字。option java_package选项的作用为: 在生成proto文件中消息的POJO类和Builder(构造者)的Java代码时,将生成的Java代码放入该选项所指定的package类路径中。option java_outer_classname 选项的作用为:在生成 proto 文件所对应的 Java 代码时,生成的 Java 外部类使用配置的名称。
在 proto 文件中,使用 message 关键字来定义消息的结构体。在生成proto对应的Java代码时,每个具体的消息结构体将对应于一个最终的Java POJO类。结构体的字段(Field)对应到POJO类的属性 (Attribute)。也就是说,每定义一个message结构体相当于声明一 个Java中的类。proto文件的message可以内嵌message,就像 Java的内部类一样。
在需要多个消息结构体时,proto文件可以像Java语言的类文件一样按照模块进行分开设计,所以一个项目可能有多个proto文件,一个文件在需要依赖其他proto文件时可以通过import导入。导入的操作, 这和Java的import操作大致相同。
字段类型
每个消息结构体可以有多个字段。定义一个字段的格式为“类型 名称 = 编号”。例如,“string content = 2;”表示该字段是 String类型,字段名为content,编号为2。字段编号表示在Protobuf 数据包的序列化、反序列化时该字段的具体排序。
分配标识号的取值范围为1~2^32(4294967296)。其中,编号 [1, 15]之内的分配标识号,时间和空间效率都是最高的。因为[1, 15]之内的标识号在编码的时候只会占用一个字节,[16, 2047] 之内的标识号要占用两个字节。所以,那些频繁出现的消息字段应该使用[1, 15]之内的标识号。切记:要为将来有可能添加的、频繁出现的字段预留一些标识号。另外,[1900, 2000]之内的标识号为Protobuf内部保留值,建议不要在自己的项目中使用。 标识号的特点是:一个消息结构体中的标识号是可以不连续的; 在同一个消息结构体中,不同的字段不能使用相同的标识号。

变长编码的类型(如int32)表示打包的字节并不是固定的,而是根据数据的大小或者长度来定的。例如int32,如果数值比较小,在 0~127时,就使用一个字节打包。
定长编码(如fixed32)和变长编码(如int32)的区别是: fixed32的打包效率比int32的效率高,但是使用的空间一般比int32 多。因此,定长编码时间效率高,变长编码空间效率高,可以根据项 目的实际情况选择。一般情况下可以选择fixed32,但是遇到对传输效率要求比较苛刻的环境时,可以选择int32。
在一个proto文件中可以声明多个message,大部分情况下会把存 在依赖关系或者包含关系的message结构体写入一个proto文件,将那些没有关系、相互独立的message结构体分别写入不同的文件,这样便于管理。
嵌套消息
proto文件支持嵌套消息。消息中既可以包含另一个消息实例作为其字段,也可以在消息中定义一个新的消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| message Outer { message MiddleA { message Inner { int64 value11 = 1; bool value22 = 2; } } message MiddleB { message Inner { int32 value11 = 1; bool value22 = 2; } } }
|
如果想在父消息类型的外部重复使用这些内部的消息类型,那么可以使用Parent.Type的形式来引用,例如:
1 2 3
| message SomeOtherMessage { Outer.MiddleA.Inner refvalue11 = 1; }
|
通过 proto 文件生成字节码
完成 “.proto” 文件定义后,下一步是生成消息的 POJO 类和 Builder(构造者)类。生成Java类有两种方式:一种是通过控制台命令;另一种是使用Maven插件。
控制台命令生成
首先从 https://github.com/protocolbuffers/protobuf/releases 下载 Protobuf 的安装包,可以选择不同的版本。
生成构造者代码需要用到安装文件中的 protoc.exe 可执行文件。 安装完成后,设置一下path环境变量,将 proto 的安装目录加入 path 环境变量中。 下面开始使用 protoc.exe 文件生成 Java 的 Builder(构造者),生 成的命令如下:
1
| protoc.exe --java_out=./src/main/java/ ./Msg.proto
|
在上面的命令中,使用的proto文件的名称为./Msg.proto,所生 成的POJO类和构造者类的输出文件夹为 ./src/main/java/。 使用命令行生成Java类的操作比较烦琐,另一种更加方便的方式 是使用 protobuf-maven-plugin 插件生成 Java 类。
通过 Maven 插件生成
使用protobuf-maven-plugin插件可以非常方便地生成消息的POJO 类和Builder(构造者)类的Java代码。在Maven的pom文件中增加依赖和此插件的配置项,具体如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| <properties> <maven.compiler.source>17</maven.compiler.source> <maven.compiler.target>17</maven.compiler.target> <protobuf.version>4.28.2</protobuf.version> </properties>
<dependencies> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>${protobuf.version}</version> </dependency> </dependencies>
<build> <extensions> <extension> <groupId>kr.motd.maven</groupId> <artifactId>os-maven-plugin</artifactId> <version>1.7.1</version> </extension> </extensions> <plugins> <plugin> <groupId>org.xolstice.maven.plugins</groupId> <artifactId>protobuf-maven-plugin</artifactId> <version>0.6.1</version> <configuration> <protoSourceRoot>${project.basedir}/protobuf</protoSourceRoot>
<protocArtifact> com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier} </protocArtifact> </configuration> <executions> <execution> <goals> <goal>compile</goal> <goal>test-compile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
|
protobuf/test01.proto
1 2 3 4 5 6 7 8 9 10
| syntax = "proto3"; package demo05;
option java_package = "demo05"; option java_outer_classname = "MsgProtos";
message Msg { uint32 id = 1; string content = 2; }
|
配置好之后,执行插件的 compile 命令,Java 代码就生成了;在 Maven 的项目编译时,POJO 类和 Builder 类也会自动生成。在实际的项目中,最好将 protobuf 作为一个单独的模块,其他模块如果用到了 protobuf,直接引用即可,方便解耦和管理。
使用Builder构造POJO对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| public class ProtobufDemo { public static MsgProtos.Msg buildMsg() { MsgProtos.Msg.Builder personBuilder = MsgProtos.Msg.newBuilder(); personBuilder.setId(1000); personBuilder.setContent("密涅瓦的猫头鹰在黄昏起飞。"); MsgProtos.Msg message = personBuilder.build(); return message; }
public static void main(String[] args) throws IOException { MsgProtos.Msg msg = buildMsg(); System.out.println(msg.getId()); System.out.println(msg.getContent()); System.out.println(); System.out.println(msg.toString());
byte[] data = msg.toByteArray(); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); outputStream.write(data); data = outputStream.toByteArray(); MsgProtos.Msg inMsg = MsgProtos.Msg.parseFrom(data); System.out.println("id:=" + inMsg.getId()); System.out.println("content:=" + inMsg.getContent());
ByteArrayOutputStream outputStream2 = new ByteArrayOutputStream(); msg.writeTo(outputStream2); ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream2.toByteArray()); MsgProtos.Msg inMsg2 = MsgProtos.Msg.parseFrom(inputStream); System.out.println("id:=" + inMsg2.getId()); System.out.println("content:=" + inMsg2.getContent());
ByteArrayOutputStream outputStream3 = new ByteArrayOutputStream(); msg.writeDelimitedTo(outputStream3); ByteArrayInputStream inputStream3 = new ByteArrayInputStream(outputStream3.toByteArray()); MsgProtos.Msg inMsg3 = MsgProtos.Msg.parseDelimitedFrom(inputStream3); System.out.println("id:=" + inMsg3.getId()); System.out.println("content:=" + inMsg3.getContent()); } }
|
1 2 3 4 5 6 7 8 9 10 11 12
| 1000 密涅瓦的猫头鹰在黄昏起飞。
id: 1000 content: "\345\257\206\346\266\205\347\223\246\347\232\204\347\214\253\345\244\264\351\271\260\345\234\250\351\273\204\346\230\217\350\265\267\351\243\236\343\200\202"
id:=1000 content:=密涅瓦的猫头鹰在黄昏起飞。 id:=1000 content:=密涅瓦的猫头鹰在黄昏起飞。 id:=1000 content:=密涅瓦的猫头鹰在黄昏起飞。
|
Protobuf为每个message结构体生成的Java类中包含了一个POJO 类、一个Builder类。构造POJO消息,首先使用POJO 类的newBuilder 静态方法获得一个 Builder,其次POJO每一个字段的值需要通过Builder 的setter()方法去设置。字段值设置完成之后,使用构造者的build() 方法构造出POJO消息对象。
Protobuf 编解码案例
Netty 默认支持Protobuf的编码与解码,内置了一套基础的 Protobuf编码和解码器。Netty 内置的基础Protobuf编码器、解码器为ProtobufEncoder、 ProtobufDecoder。此外,还提供了一组简单的解决半包问题的编码器 和解码器。
ProtobufEncoder 编码器
ProtobufEncoder 的实现逻辑非常简单,直接调用了Protobuf POJO实例的toByteArray()方法将自身编码成二进制字节,然后放入Netty 的 ByteBuf 缓冲区中,接着会被发送到下一站编码器。其源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| package io.netty.handler.codec.protobuf;
import com.google.protobuf.MessageLite; import com.google.protobuf.MessageLiteOrBuilder; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandler.Sharable; import io.netty.handler.codec.MessageToMessageEncoder; import java.util.List;
@Sharable public class ProtobufEncoder extends MessageToMessageEncoder<MessageLiteOrBuilder> { public ProtobufEncoder() { super(MessageLiteOrBuilder.class); }
protected void encode(ChannelHandlerContext ctx, MessageLiteOrBuilder msg, List<Object> out) throws Exception { if (msg instanceof MessageLite) { out.add(Unpooled.wrappedBuffer(((MessageLite)msg).toByteArray())); } else { if (msg instanceof MessageLite.Builder) { out.add(Unpooled.wrappedBuffer(((MessageLite.Builder)msg).build().toByteArray())); } } } }
|
ProtobufDecoder 解码器
ProtobufDecoder 和 ProtobufEncoder相互对应,只不过在使用的时候 ProtobufDecoder 解码器需要指定一个Protobuf POJO 实例作为解码的参考原型(prototype),解码时会根据原型实例找到对应的 Parser 解析器,将二进制的字节解码为 Protobuf POJO 实例。
1
| new ProtobufDecoder(MsgProtos.Msg.getDefaultInstance())
|
在 Java NIO 通信中,仅仅使用以上这组编码器和解码器,传输过程中会存在粘包/半包的问题。Netty 也提供了配套的 Head-Content 类型的 Protobuf 编码器和解码器,在二进制码流之前加上二进制字节数组的长度。
ProtobufVarint32LengthFieldPrepender 长度编码器
它的作用是在 ProtobufEncoder 生成的字节数组之前前置一个 varint32 数字,表示序列化的二进制字节数量或者长度。
ProtobufVarint32FrameDecoder 长度解码器
ProtobufVarint32FrameDecoder 和 ProtobufVarint32LengthFieldPrepender 相互对应,其作用是根据数据包中长度域(varint32类型)中的长度值解码一个足额的字节数组,然后将字节数组交给下一站的解码器 ProtobufDecoder。什么是varint32类型的长度?Protobuf为什么不用int这种固定类型的长度?
varint32是一种紧凑的表示数字的方法,不是一种固定长度(如 32位)的数字类型。varint32用一个或多个字节来表示一个数字,值越小,使用的字节数越少,值越大使用的字节数越多。varint32根据值的大小自动进行收缩,能够减少用于保存长度的字节数。也就是说,varint32与int类型的最大区别是:varint32用一个或多个字节来表示一个数字,int是固定长度的数字。varint32不是固定长度,所以为了更好地减少通信过程中的传输量,消息头中的长度尽量采用 varint格式。
至此,Netty内置的Protobuf编码器和解码器已经初步介绍完,可以通过这两组编码器/解码器完成Head-Content (Length + Protobuf Data)协议的数据传输。但是,在更加复杂的传输应用场景下,Netty 的内置编码器和解码器是不够用的。例如,在Head部分需要加上魔数字段进行安全验证或者需要对Protobuf字节内容进行加密和解密,或者在其他复杂的传输应用场景下,需要定制属于自己的Protobuf编码器和解码器。
Protobuf 传输案例
服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
| import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
public class ProtoBufServer { private final int port; public ProtoBufServer(int port) { this.port = port; }
public void runServer() { EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1); EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
try { ServerBootstrap b = new ServerBootstrap(); b.group(bossLoopGroup, workerLoopGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
ch.pipeline().addLast(new ProtobufDecoder(MsgProtos.Msg.getDefaultInstance()));
ch.pipeline().addLast(new ProtobufBusinessHandler()); } });
ChannelFuture f = b.bind(port).sync(); System.out.println("Protobuf 服务端启动成功,监听端口:" + port);
f.channel().closeFuture().sync(); } catch (Exception e) { System.out.println("服务端运行异常" + e.getMessage()); } finally { bossLoopGroup.shutdownGracefully(); workerLoopGroup.shutdownGracefully(); } }
static class ProtobufBusinessHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { MsgProtos.Msg protoMsg = (MsgProtos.Msg) msg; System.out.println("收到一个 Protobuf POJO =>>"); System.out.println("ID: " + protoMsg.getId()); System.out.println("Content: " + protoMsg.getContent()); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { System.err.println("发生异常:" + cause.getCause()); ctx.close(); } }
public static void main(String[] args) { new ProtoBufServer(9000).runServer(); } }
|
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
| import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
public class ProtoBufSendClient { private final String ip; private final int port; static String content = "密涅瓦的猫头鹰在黄昏起飞。";
public ProtoBufSendClient(String ip, int port) { this.ip = ip; this.port = port; }
public void runClient() { EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
try { Bootstrap b = new Bootstrap(); b.group(workerLoopGroup) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
ch.pipeline().addLast(new ProtobufEncoder()); } });
System.out.println("正在连接服务器 " + ip + ":" + port + "..."); ChannelFuture f = b.connect(ip, port).sync();
Channel channel = f.channel(); for (int i = 0; i < 1000; i++) { MsgProtos.Msg msg = build(i, i + " -> " + content); channel.writeAndFlush(msg); System.out.println("已发送第 " + i + " 条报文"); } System.out.println("所有数据发送完成!");
} catch (Exception e) { System.err.println("客户端运行异常:" + e.getMessage()); } finally { workerLoopGroup.shutdownGracefully(); } }
public MsgProtos.Msg build(int id, String content) { MsgProtos.Msg.Builder builder = MsgProtos.Msg.newBuilder(); builder.setId(id); builder.setContent(content); return builder.build(); }
public static void main(String[] args) { new ProtoBufSendClient("127.0.0.1", 9000).runClient(); } }
|
标题:
Java NIO - Netty 使用 Protobuf 协议通信